/**
 * FileName: StreamingFilterMap
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/18 10:51
 * Description: 进行数据过滤和映射
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.domain.Xdata;
import cn.com.bonc.filter.FilterChain;
import cn.com.bonc.filter.impl.DefaultFilterImpl;
import cn.com.bonc.filter.impl.ExternalJsonFilterImpl;
import cn.com.bonc.util.DataFilterAndOperatorUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class StreamingFilterMap {
    public static void main(String[] args) {

        /**
         * SparkSession 是 Spark SQL 的入口。
         * 使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候，第一个要创建的对象就是 SparkSession。
         */
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("FilterMapApp")
                .getOrCreate();
        /**
         * 从kafka读取数据，为流式查询创建Kafka源
         */
        Dataset<Row> kafkaDataset = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
                .load();

        Dataset<String> stringDataset = kafkaDataset
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());

        /**
         * 使用filter方法进行数据过滤，只有数据元素大于5个数据才会被保留
         * 注意：lambda表达式需要强制类型转换
         */
        Dataset<String> filterResult = stringDataset.filter((FilterFunction<String>) line -> {
            String[] lines = StringUtils.splitPreserveAllTokens(line, "|");
            if (lines.length ==4) {
                return true;
            }
            return false;
        });

        /**
         * 使用map方法进行映射，将部分字符元素封装成Xdata实体类
         */
        Dataset<Xdata> mapResult = filterResult.map((MapFunction<String, Xdata>) line -> {
            String[] lines = StringUtils.splitPreserveAllTokens(line, "|");
            return new Xdata(lines[0], lines[1]);
        }, Encoders.bean(Xdata.class));

        mapResult.printSchema();

        /**
         * 使用 DataFilterAndOperatorUtil 对数据进行操作
         * 本质上此工具类是对map 和filter方法的实现，将筛选的条件和对数据的操作交给外部配置文件
         */
        DataFilterAndOperatorUtil.getInstance().filter(stringDataset);

        /**
         * 使用FilterChain进行多次过滤处理
         * 通过使用Filter接口，在实现类中对数据进行处理，最终形成过滤链的形式
         */
        FilterChain.setSourceData(kafkaDataset)
                .addFilter(new ExternalJsonFilterImpl())
                .addFilter(new DefaultFilterImpl())
                .execute();
        /**
         * 打印结果输出到控制台
         */
		StreamingQuery query = mapResult
                .writeStream()
				.format("console")
				.outputMode(OutputMode.Append())
				.option("truncate", "false")
				.start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }

    }
}
